今天是第十九天我們可以寫一個k8s交通分析資料庫管理系統,以下是我的程式碼
我們來做一個基本架構,以 PostgreSQL 作為資料庫,並搭配一個基於 Flask 的 REST API 作為數據入口,最後將整個系統打包進 Kubernetes。
首先,創建一個 PostgreSQL 部署在 K8s 上:
postgresql-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:latest
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_DB
          value: "traffic_db"
        - name: POSTGRES_USER
          value: "admin"
        - name: POSTGRES_PASSWORD
          value: "password"
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pv-claim
---
apiVersion: v1
kind: Service
metadata:
  name: postgres
spec:
  ports:
    - port: 5432
  selector:
    app: postgres
postgres-pv-claim.yaml:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pv-claim
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
這個設定將 PostgreSQL 容器化並在 K8s 集群內部署,同時配置一個持久化卷(PV)來保存資料。
接下來,建立一個簡單的 Flask 應用,作為交通數據的 REST API。
app.py:
from flask import Flask, request, jsonify
import psycopg2
app = Flask(__name__)
# 連接 PostgreSQL 資料庫
def connect_db():
    conn = psycopg2.connect(
        dbname="traffic_db",
        user="admin",
        password="password",
        host="postgres",  # 這是 K8s 的 service 名字
        port="5432"
    )
    return conn
@app.route('/add_traffic_data', methods=['POST'])
def add_traffic_data():
    data = request.json
    try:
        conn = connect_db()
        cur = conn.cursor()
        # 將數據插入資料庫
        cur.execute("""
            INSERT INTO traffic_data (location, timestamp, vehicle_count)
            VALUES (%s, %s, %s)
        """, (data['location'], data['timestamp'], data['vehicle_count']))
        conn.commit()
        cur.close()
        conn.close()
        return jsonify({"status": "success"}), 201
    except Exception as e:
        return jsonify({"error": str(e)}), 500
@app.route('/get_traffic_data', methods=['GET'])
def get_traffic_data():
    location = request.args.get('location')
    try:
        conn = connect_db()
        cur = conn.cursor()
        cur.execute("SELECT * FROM traffic_data WHERE location = %s", (location,))
        rows = cur.fetchall()
        cur.close()
        conn.close()
        return jsonify({"data": rows}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
將 Flask 應用打包成 Docker 容器。
Dockerfile:
Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py"]
requirements.txt:
Flask==2.0.1
psycopg2==2.9.1
將 Flask 應用部署到 Kubernetes 中:
flask-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flask-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flask-app
  template:
    metadata:
      labels:
        app: flask-app
    spec:
      containers:
      - name: flask-app
        image: <your-flask-image>
        ports:
        - containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
  name: flask-app
spec:
  type: LoadBalancer
  ports:
    - port: 5000
  selector:
    app: flask-app
將這個應用部署到 Kubernetes,並設置 LoadBalancer 類型的 Service 來暴露 Flask API。
記得在 PostgreSQL 中創建一個表格來保存交通數據。
初始化 SQL 指令:
CREATE TABLE traffic_data (
    id SERIAL PRIMARY KEY,
    location VARCHAR(255),
    timestamp TIMESTAMP,
    vehicle_count INT
);
你可以使用 PostgreSQL 的 psql 命令或 pgAdmin 來運行這段 SQL 指令。
完成所有配置後,通過以下指令將應用部署到 Kubernetes 上:
kubectl apply -f postgresql-deployment.yaml
kubectl apply -f postgres-pv-claim.yaml
kubectl apply -f flask-deployment.yaml
postgresql-deployment.yaml)這段 YAML 文件定義了 PostgreSQL 資料庫在 Kubernetes 上的部署。
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:latest
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_DB
          value: "traffic_db"
        - name: POSTGRES_USER
          value: "admin"
        - name: POSTGRES_PASSWORD
          value: "password"
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pv-claim
replicas: 1 指定了這個應用只會有一個 PostgreSQL 副本運行。POSTGRES_DB、POSTGRES_USER 和 POSTGRES_PASSWORD 是設定資料庫的名字、使用者和密碼。postgres-pv-claim.yaml)這個檔案定義了資料庫的 PersistentVolumeClaim (PVC),用來請求 Kubernetes 中的持久化存儲。
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pv-claim
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
10Gi 指的是 10 GB 的存儲空間。app.py)這個 Python 程式使用 Flask 框架來提供 REST API,並與 PostgreSQL 資料庫交互。它能接收交通數據並存儲到資料庫,或查詢並返回存儲的數據。
from flask import Flask, request, jsonify
import psycopg2
app = Flask(__name__)
# 連接 PostgreSQL 資料庫
def connect_db():
    conn = psycopg2.connect(
        dbname="traffic_db",
        user="admin",
        password="password",
        host="postgres",  # 這是 K8s 的 service 名字
        port="5432"
    )
    return conn
connect_db() 函數:使用 psycopg2 來連接到 PostgreSQL 資料庫。host="postgres" 指定了資料庫的服務名稱,這個名稱是由 postgres Kubernetes service 提供的內部 DNS。@app.route('/add_traffic_data', methods=['POST'])
def add_traffic_data():
    data = request.json
    try:
        conn = connect_db()
        cur = conn.cursor()
        # 將數據插入資料庫
        cur.execute("""
            INSERT INTO traffic_data (location, timestamp, vehicle_count)
            VALUES (%s, %s, %s)
        """, (data['location'], data['timestamp'], data['vehicle_count']))
        conn.commit()
        cur.close()
        conn.close()
        return jsonify({"status": "success"}), 201
    except Exception as e:
        return jsonify({"error": str(e)}), 500
@app.route('/add_traffic_data', methods=['POST']):這個路由會處理 POST 請求,用於接收交通數據並將其存入資料庫。request.json:從 HTTP 請求的 body 中提取 JSON 格式的數據。cur.execute():用 SQL 語句將接收到的 location(地點)、timestamp(時間戳)、vehicle_count(車輛數量)插入到資料庫中的 traffic_data 表格。commit():提交事務,將數據永久寫入資料庫。@app.route('/get_traffic_data', methods=['GET'])
def get_traffic_data():
    location = request.args.get('location')
    try:
        conn = connect_db()
        cur = conn.cursor()
        cur.execute("SELECT * FROM traffic_data WHERE location = %s", (location,))
        rows = cur.fetchall()
        cur.close()
        conn.close()
        return jsonify({"data": rows}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500
@app.route('/get_traffic_data', methods=['GET']):這個路由會處理 GET 請求,根據地點 location 查詢資料庫中的交通數據。request.args.get('location'):從 URL 中提取地點參數(location)。cur.execute():執行 SQL 查詢,從資料庫中獲取匹配地點的所有交通數據。fetchall():提取所有匹配的資料,並以 JSON 格式返回。將 Flask 應用打包成 Docker 映像,方便部署到 Kubernetes。
Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py"]
FROM python:3.9-slim:使用輕量版的 Python 基礎映像。WORKDIR /app:設定工作目錄 /app。COPY requirements.txt:將 requirements.txt 複製到 Docker 容器中,這個檔案包含了 Flask 和 psycopg2 的依賴項。RUN pip install:安裝應用所需的依賴項。CMD:告訴 Docker 啟動容器後運行 Flask 應用。flask-deployment.yaml)這段 YAML 文件定義了如何在 Kubernetes 中部署 Flask 應用。
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flask-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flask-app
  template:
    metadata:
      labels:
        app: flask-app
    spec:
      containers:
      - name: flask-app
        image: <your-flask-image>
        ports:
        - containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
  name: flask-app
spec:
  type: LoadBalancer
  ports:
    - port: 5000
  selector:
    app: flask-app
<your-flask-image> 替換為你實際的 Docker 映像名稱。LoadBalancer 會自動分配一個公共 IP,使得應用可以從外部訪問。在 PostgreSQL 中,你需要創建一個用來保存交通數據的表格:
CREATE TABLE traffic_data (
    id SERIAL PRIMARY KEY,
    location VARCHAR(255),
    timestamp TIMESTAMP,
    vehicle_count INT
);
這個 SQL 語句創建了一個表 traffic_data,其中包含:
id:自增的主鍵。location:交通事件發生的地點。timestamp:記錄的時間戳。vehicle_count:記錄車輛數量。這個系統包含:
Flask 應用:提供兩個 API 端點,一個用來插入交通數據,一個用來查詢交通數據。